1 /* 2 * Collie - An asynchronous event-driven network framework using Dlang development 3 * 4 * Copyright (C) 2015-2017 Shanghai Putao Technology Co., Ltd 5 * 6 * Developer: putao's Dlang team 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module collie.codec.http.codec.http1xcodec; 12 import kiss.logger; 13 import collie.codec.http.codec.httpcodec; 14 import collie.codec.http.errocode; 15 import collie.codec.http.headers; 16 import collie.codec.http.httpmessage; 17 import collie.codec.http.httptansaction; 18 import collie.codec.http.parser; 19 import collie.utils.string; 20 import std.array; 21 import std.conv; 22 import std.traits; 23 24 class HTTP1XCodec : HTTPCodec 25 { 26 this(TransportDirection direction, uint maxHeaderSize = (64 * 1024)) 27 { 28 _transportDirection = direction; 29 _finished = true; 30 _maxHeaderSize = maxHeaderSize; 31 _parser.onUrl(&onUrl); 32 _parser.onMessageBegin(&onMessageBegin); 33 _parser.onHeaderComplete(&onHeadersComplete); 34 _parser.onHeaderField(&onHeaderField); 35 _parser.onHeaderValue(&onHeaderValue); 36 _parser.onStatus(&onStatus); 37 _parser.onChunkHeader(&onChunkHeader); 38 _parser.onChunkComplete(&onChunkComplete); 39 _parser.onBody(&onBody); 40 _parser.onMessageComplete(&onMessageComplete); 41 } 42 43 override CodecProtocol getProtocol() { 44 return CodecProtocol.HTTP_1_X; 45 } 46 47 override TransportDirection getTransportDirection() 48 { 49 return _transportDirection; 50 } 51 52 override StreamID createStream() { 53 return 0; 54 } 55 56 override bool isBusy() { 57 return !_finished; 58 } 59 60 override bool shouldClose() 61 { 62 return !_keepalive; 63 } 64 65 override void setParserPaused(bool paused){} 66 67 override void setCallback(CallBack callback) { 68 _callback = callback; 69 } 70 71 override size_t onIngress(ubyte[] buf) 72 { 73 version(CollieDebugMode) logDebug("on Ingress!!"); 74 if(_finished) { 75 _parser.rest(HTTPParserType.HTTP_BOTH,_maxHeaderSize); 76 } 77 auto size = _parser.httpParserExecute(buf); 78 if(size != buf.length && _parser.isUpgrade == false && _transaction && _callback){ 79 _callback.onError(_transaction,HTTPErrorCode.PROTOCOL_ERROR); 80 } 81 return cast(size_t) size; 82 } 83 84 override void onConnectClose() 85 { 86 if(_transaction){ 87 _transaction.onErro(HTTPErrorCode.REMOTE_CLOSED); 88 _transaction.handler = null; 89 _transaction.transport = null; 90 _transaction = null; 91 } 92 } 93 94 override void onTimeOut() 95 { 96 if(_transaction){ 97 _transaction.onErro(HTTPErrorCode.TIME_OUT); 98 } 99 } 100 101 override void detach(HTTPTransaction txn) 102 { 103 if(txn is _transaction) 104 _transaction = null; 105 } 106 107 override size_t generateHeader( 108 HTTPTransaction txn, 109 HTTPMessage msg, 110 HttpWriteBuffer buffer, 111 bool eom = false) 112 { 113 const bool upstream = (_transportDirection == TransportDirection.UPSTREAM); 114 const size_t beforLen = buffer.length; 115 auto hversion = msg.getHTTPVersion(); 116 _egressChunked = msg.chunked && !_egressUpgrade; 117 _lastChunkWritten = false; 118 bool hasTransferEncodingChunked = false; 119 bool hasUpgradeHeader = false; 120 bool hasDateHeader = false; 121 bool is1xxResponse = false; 122 bool ingorebody = false; 123 _keepalive = _keepalive & msg.wantsKeepAlive; 124 if(!upstream) { 125 is1xxResponse = msg.is1xxResponse; 126 appendLiteral(buffer,"HTTP/"); 127 appendLiteral(buffer,to!string(hversion.maj)); 128 appendLiteral(buffer,"."); 129 appendLiteral(buffer,to!string(hversion.min)); 130 appendLiteral(buffer," "); 131 ushort code = msg.statusCode; 132 ingorebody = responseBodyMustBeEmpty(code); 133 appendLiteral(buffer,to!string(code)); 134 appendLiteral(buffer," "); 135 appendLiteral(buffer,msg.statusMessage); 136 } else { 137 appendLiteral(buffer,msg.methodString); 138 appendLiteral(buffer," "); 139 appendLiteral(buffer,msg.getPath); 140 appendLiteral(buffer," HTTP/"); 141 appendLiteral(buffer,to!string(hversion.maj)); 142 appendLiteral(buffer,"."); 143 appendLiteral(buffer,to!string(hversion.min)); 144 _mayChunkEgress = (hversion.maj == 1) && (hversion.min >= 1); 145 } 146 appendLiteral(buffer,"\r\n"); 147 _egressChunked &= _mayChunkEgress; 148 string contLen; 149 string upgradeHeader; 150 foreach(HTTPHeaderCode code,string key,string value; msg.getHeaders) 151 { 152 if(code == HTTPHeaderCode.CONTENT_LENGTH){ 153 contLen = value; 154 continue; 155 } else if (code == HTTPHeaderCode.CONNECTION) { 156 if(isSameIngnoreLowUp(value,"close")) { 157 _keepalive = false; 158 } 159 continue; 160 } else if(code == HTTPHeaderCode.UPGRADE){ 161 if(upstream) upgradeHeader = value; 162 hasUpgradeHeader = true; 163 } else if (!hasTransferEncodingChunked && 164 code == HTTPHeaderCode.TRANSFER_ENCODING) { 165 if(!isSameIngnoreLowUp(value,"chunked")) 166 continue; 167 hasTransferEncodingChunked = true; 168 if(!_mayChunkEgress) 169 continue; 170 } 171 appendLiteral(buffer,key); 172 appendLiteral(buffer,": "); 173 appendLiteral(buffer,value); 174 appendLiteral(buffer,"\r\n"); 175 } 176 _inChunk = false; 177 bool bodyCheck = ((!upstream) && _keepalive && !ingorebody && !_egressUpgrade) || 178 // auto chunk POSTs and any request that came to us chunked 179 (upstream && ((msg.method == HTTPMethod.HTTP_POST) || _egressChunked)); 180 // TODO: 400 a 1.0 POST with no content-length 181 // clear egressChunked_ if the header wasn't actually set 182 _egressChunked &= hasTransferEncodingChunked; 183 if(bodyCheck && contLen.length == 0 && !_egressChunked){ 184 if (!hasTransferEncodingChunked && _mayChunkEgress) { 185 appendLiteral(buffer,"Transfer-Encoding: chunked\r\n"); 186 _egressChunked = true; 187 } else { 188 _keepalive = false; 189 } 190 } 191 if(!is1xxResponse || upstream || hasUpgradeHeader){ 192 appendLiteral(buffer,"Connection: "); 193 if(hasUpgradeHeader) { 194 appendLiteral(buffer,"upgrade\r\n"); 195 _keepalive = true; 196 } else if(_keepalive) 197 appendLiteral(buffer,"keep-alive\r\n"); 198 else 199 appendLiteral(buffer,"close\r\n"); 200 } 201 appendLiteral(buffer,"Server: Collie\r\n"); 202 if(contLen.length > 0){ 203 appendLiteral(buffer,"Content-Length: "); 204 appendLiteral(buffer,contLen); 205 appendLiteral(buffer,"\r\n"); 206 } 207 208 appendLiteral(buffer,"\r\n"); 209 return buffer.length - beforLen; 210 } 211 212 override size_t generateBody(HTTPTransaction txn, 213 HttpWriteBuffer chain,in ubyte[] data, 214 bool eom) 215 { 216 size_t rlen = chain.write(data); 217 if(_egressChunked && _inChunk) { 218 appendLiteral(chain,"\r\n"); 219 _inChunk = false; 220 rlen += 2; 221 } 222 if(eom) 223 rlen += generateEOM(txn,chain); 224 return rlen; 225 } 226 227 override size_t generateChunkHeader( 228 HTTPTransaction txn, 229 HttpWriteBuffer buffer, 230 size_t length) 231 { 232 logDebug("_egressChunked ", _egressChunked); 233 if (_egressChunked){ 234 import std.format; 235 _inChunk = true; 236 string lent = format("%x\r\n",length); 237 logDebug("length is : ", length, " x is: ", lent); 238 appendLiteral(buffer,lent); 239 return lent.length; 240 } 241 return 0; 242 } 243 244 245 override size_t generateChunkTerminator( 246 HTTPTransaction txn, 247 HttpWriteBuffer buffer) 248 { 249 if(_egressChunked && _inChunk) 250 { 251 _inChunk = false; 252 appendLiteral(buffer,"\r\n"); 253 return 2; 254 } 255 return 0; 256 } 257 258 override size_t generateEOM(HTTPTransaction txn, 259 HttpWriteBuffer buffer) 260 { 261 size_t rlen = 0; 262 if(_egressChunked) { 263 assert(!_inChunk); 264 if (_headRequest && _transportDirection == TransportDirection.DOWNSTREAM) { 265 _lastChunkWritten = true; 266 } else { 267 // appending a 0\r\n only if it's not a HEAD and downstream request 268 if (!_lastChunkWritten) { 269 _lastChunkWritten = true; 270 //if (!(_headRequest && 271 // transportDirection_ == TransportDirection.DOWNSTREAM)) { 272 appendLiteral(buffer,"0\r\n"); 273 rlen += 3; 274 //} 275 } 276 appendLiteral(buffer,"\r\n"); 277 } 278 rlen += 2; 279 } 280 switch (_transportDirection) { 281 case TransportDirection.DOWNSTREAM: 282 _responsePending = false; 283 break; 284 case TransportDirection.UPSTREAM: 285 _requestPending = false; 286 break; 287 default: 288 break; 289 } 290 return rlen; 291 } 292 293 override size_t generateRstStream(HTTPTransaction txn, 294 HttpWriteBuffer buffer,HTTPErrorCode code) 295 { 296 return 0; 297 } 298 protected: 299 300 final void appendLiteral(T)(HttpWriteBuffer buffer, T[] data) if(isSomeChar!(Unqual!T) || is(Unqual!T == byte) || is(Unqual!T == ubyte)) 301 { 302 buffer.write(cast(const (ubyte[]))data); 303 } 304 305 void onMessageBegin(ref HTTPParser){ 306 _finished = false; 307 _headersComplete = false; 308 _message = new HTTPMessage(); 309 if (_transportDirection == TransportDirection.DOWNSTREAM) { 310 _requestPending = true; 311 _responsePending = true; 312 } 313 // If there was a 1xx on this connection, don't increment the ingress txn id 314 if (_transportDirection == TransportDirection.DOWNSTREAM || 315 !_is1xxResponse) { 316 } 317 if (_transportDirection == TransportDirection.UPSTREAM) { 318 _is1xxResponse = false; 319 } 320 _transaction = new HTTPTransaction(_transportDirection,0,0); 321 if(_callback) 322 _callback.onMessageBegin(_transaction, _message); 323 _currtKey = (ubyte[]).init; 324 _currtValue = (ubyte[]).init; 325 } 326 327 void onHeadersComplete(ref HTTPParser parser){ 328 _mayChunkEgress = ((parser.major == 1) && (parser.minor >= 1)); 329 _message.setHTTPVersion(cast(ubyte)parser.major, cast(ubyte)parser.minor); 330 _egressUpgrade = parser.isUpgrade; 331 _message.upgraded(parser.isUpgrade); 332 int klive = parser.keepalive; 333 version(CollieDebugMode) logDebug("++++++++++klive : ", klive); 334 switch(klive){ 335 case 1: 336 _keepalive = true; 337 break; 338 case 2: 339 _keepalive = false; 340 break; 341 default : 342 _keepalive = false; 343 } 344 _message.wantsKeepAlive(_keepalive); 345 _headersComplete = true; 346 if(_message.upgraded){ 347 string upstring = _message.getHeaders.getSingleOrEmpty(HTTPHeaderCode.UPGRADE); 348 CodecProtocol pro = getProtocolFormString(upstring); 349 if(_callback) 350 _callback.onNativeProtocolUpgrade(_transaction,pro,upstring,_message); 351 } else { 352 if(_callback) 353 _callback.onHeadersComplete(_transaction,_message); 354 } 355 } 356 357 void onMessageComplete(ref HTTPParser parser){ 358 _finished = true; 359 switch (_transportDirection) { 360 case TransportDirection.DOWNSTREAM: 361 { 362 _requestPending = false; 363 // else there was no match, OR we upgraded to http/1.1 OR someone specified 364 // a non-native protocol in the setAllowedUpgradeProtocols. No-ops 365 break; 366 } 367 case TransportDirection.UPSTREAM: 368 _responsePending = _is1xxResponse; 369 break; 370 default: break; 371 } 372 if(_callback) 373 _callback.onMessageComplete(_transaction,parser.isUpgrade); 374 } 375 376 void onChunkHeader(ref HTTPParser parser){ 377 if(_callback) 378 _callback.onChunkHeader(_transaction,cast(size_t)parser.contentLength); 379 } 380 381 void onChunkComplete(ref HTTPParser parser){ 382 if(_callback) 383 _callback.onChunkComplete(_transaction); 384 } 385 386 void onUrl(ref HTTPParser parser, ubyte[] data, bool finish) 387 { 388 //logDebug("on Url"); 389 _message.method = parser.methodCode(); 390 _connectRequest = (parser.methodCode() == HTTPMethod.HTTP_CONNECT); 391 392 // If this is a headers-only request, we shouldn't send 393 // an entity-body in the response. 394 _headRequest = (parser.methodCode() == HTTPMethod.HTTP_HEAD); 395 396 _currtKey ~= data; 397 if(finish) { 398 _message.url = cast(string)(_currtKey); 399 _currtKey = (ubyte[]).init; 400 } 401 } 402 403 void onStatus(ref HTTPParser parser, ubyte[] data, bool finish) 404 { 405 406 _currtKey ~= data; 407 if(finish) { 408 string sdata = cast(string)_currtKey; 409 _currtKey = (ubyte[]).init; 410 _message.statusCode(cast(ushort)parser.statusCode); 411 _message.statusMessage(sdata); 412 } 413 } 414 415 void onHeaderField(ref HTTPParser parser, ubyte[] data, bool finish) 416 { 417 //logDebug("on onHeaderField"); 418 _currtKey ~= data; 419 } 420 421 void onHeaderValue(ref HTTPParser parser, ubyte[] data, bool finish) 422 { 423 // logDebug("on onHeaderField"); 424 _currtValue ~= data; 425 if(finish){ 426 string key = cast(string)_currtKey; 427 _currtKey = (ubyte[]).init; 428 string value = cast(string)_currtValue; 429 _currtValue = (ubyte[]).init; 430 version(CollieDebugMode) logDebug("http header: \t", key, " : ", value); 431 _message.getHeaders.add(key,value); 432 } 433 } 434 435 void onBody(ref HTTPParser parser, ubyte[] data, bool finish) 436 { 437 version(CollieDebugMode) debug logDebug("on boday, length : ", data.length); 438 _callback.onBody(_transaction,data); 439 } 440 441 bool responseBodyMustBeEmpty(ushort status) { 442 return (status == 304 || status == 204 || 443 (100 <= status && status < 200)); 444 } 445 private: 446 TransportDirection _transportDirection; 447 CallBack _callback; 448 HTTPTransaction _transaction; 449 HTTPMessage _message; 450 ubyte[] _currtKey; 451 ubyte[] _currtValue; 452 HTTPParser _parser; 453 454 uint _maxHeaderSize; 455 bool _finished; 456 private: 457 bool _parserActive = false; 458 bool _pendingEOF = false; 459 bool _parserPaused = false; 460 bool _parserError = false; 461 bool _requestPending = false; 462 bool _responsePending = false; 463 bool _egressChunked = false; 464 bool _inChunk = false; 465 bool _lastChunkWritten = false; 466 bool _keepalive = false; 467 bool _disableKeepalivePending = false; 468 bool _connectRequest = false; 469 bool _headRequest = false; 470 bool _expectNoResponseBody = false; 471 bool _mayChunkEgress = false; 472 bool _is1xxResponse = false; 473 bool _inRecvLastChunk = false; 474 bool _ingressUpgrade = false; 475 bool _ingressUpgradeComplete = false; 476 bool _egressUpgrade = false; 477 bool _nativeUpgrade = false; 478 bool _headersComplete = false; 479 } 480